package com.wlinker.driver.opc.ua.utils;

import cn.wlinker.driver.common.utils.ConnectionCacheUtils;
import cn.wlinker.driver.common.utils.IConnectHelper;
import com.wlinker.driver.opc.ua.domain.OpcUaConnectBean;
import lombok.Getter;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
import org.eclipse.milo.opcua.sdk.client.api.UaSession;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem;
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedSubscription;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.*;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
 * OPC UA连接助手
 *
 * @author gxsjx
 * @version 1.0
 * @date 2023/7/13
 * Copyright © wlinker.cn
 */
public class OpcUaConnectHelper implements IConnectHelper<OpcUaConnectBean, OpcUaClient> {

    @Getter
    private static final OpcUaConnectHelper instance = new OpcUaConnectHelper();

    private static final Map<String, Boolean> isCollectedMap = new HashMap<>();

    @Override
    public String getConnectionKey(OpcUaConnectBean connectBean) {
        return connectBean.getEndPointUrl();
    }

    @Override
    public OpcUaClient connect(OpcUaConnectBean connectBean) throws UaException, ExecutionException, InterruptedException {
        String endPointUrl = connectBean.getEndPointUrl();
        OpcUaClient opcUaClient = OpcUaClient.create(endPointUrl,
                endpoints ->
                        endpoints.stream()
                                .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
                                .findFirst(),
                configBuilder ->
                        configBuilder
                                .setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
                                .setApplicationUri("urn:eclipse:milo:examples:client")
                                //访问方式
                                .setIdentityProvider(new AnonymousProvider())
                                .setRequestTimeout(UInteger.valueOf(5000))
                                .build()
        );
        opcUaClient.addSessionActivityListener(new SessionActivityListener() {
            @Override
            public void onSessionActive(UaSession session) {
                SessionActivityListener.super.onSessionActive(session);
                System.err.println("onSessionActive");
                isCollectedMap.put(getCacheKey(connectBean), true);
            }
        });
        opcUaClient.addFaultListener(serviceFault -> {
            System.err.println("onServiceFault");
            isCollectedMap.put(getCacheKey(connectBean), false);
        });
        opcUaClient.connect().get();
        return opcUaClient;
    }

    @Override
    public boolean isConnected(OpcUaConnectBean connectBean) {
        String key = getCacheKey(connectBean);
        return isCollectedMap.getOrDefault(key, false);
    }

    @Override
    public boolean disconnect(OpcUaConnectBean connectBean) {
        String key = getCacheKey(connectBean);
        ConnectionCacheUtils.get(key, OpcUaClient.class).disconnect();
        ConnectionCacheUtils.remove(key);
        isCollectedMap.remove(key);
        return true;
    }

    /**
     * 遍历树形节点
     *
     * @param client OPC UA客户端
     * @param uaNode 节点
     * @throws Exception
     */
    public Set<UaNode> listNode(OpcUaClient client, UaNode uaNode) throws Exception {
        List<? extends UaNode> nodes;
        Set<UaNode> uaNodes = new HashSet<>();
        if (uaNode == null) {
            nodes = client.getAddressSpace().browseNodes(Identifiers.ObjectsFolder);
        } else {
            nodes = client.getAddressSpace().browseNodes(uaNode);
            if (uaNode.getNodeClass().equals(NodeClass.Variable)) {
                uaNodes.add(uaNode);
            }
        }
        for (UaNode nd : nodes) {
            //排除命名空间不是2的节点
            UShort namespaceIndex = nd.getNodeId().getNamespaceIndex();
            UShort uShort = UShort.valueOf(2);
            if (!uShort.equals(namespaceIndex)) {
                continue;
            }
            //排除系统性节点，这些系统性节点名称一般都是以"_"开头
            String name = nd.getBrowseName().getName();
            Object identifier = nd.getNodeId().getIdentifier();
            if (Objects.requireNonNull(name).startsWith("_") || identifier.toString().contains("示例")) {
                continue;
            }
            //排除非变量节点
            Set<UaNode> uaNodes1 = listNode(client, nd);
            uaNodes1 = uaNodes1.stream()
                    .filter(t -> t.getNodeClass().equals(NodeClass.Variable))
                    .collect(Collectors.toSet());
            uaNodes.addAll(uaNodes1);
        }
        return uaNodes;
    }

    /**
     * 读取节点数据
     *
     * namespaceIndex可以通过UaExpert客户端去查询，一般来说这个值是2。
     * identifier也可以通过UaExpert客户端去查询，这个值=通道名称.设备名称.标记名称
     *
     * @param client
     * @param namespaceIndex
     * @param identifier
     * @throws Exception
     */
    public DataValue readNodeValue(OpcUaClient client, int namespaceIndex, String identifier) throws Exception {
        //节点
        NodeId nodeId = new NodeId(namespaceIndex, identifier);
        //读取节点数据
        return client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get();
    }


    /**
     * 写入节点数据
     *
     * @param client
     * @param namespaceIndex
     * @param identifier
     * @param value
     * @throws Exception
     * @return
     */
    public boolean writeNodeValue(OpcUaClient client, int namespaceIndex, String identifier, Object value) {
        //节点
        NodeId nodeId = new NodeId(namespaceIndex, identifier);
        //创建数据对象,此处的数据对象一定要定义类型，不然会出现类型错误，导致无法写入
        Variant targetValue = new Variant(value);
        DataValue newValue = new DataValue(targetValue, null, null);
        //写入节点数据
        StatusCode statusCode = client.writeValue(nodeId, newValue).join();
        return statusCode.isGood();
    }


    private static final AtomicInteger atomic = new AtomicInteger();
    /**
     * 订阅(单个)
     *
     * @param client
     * @param namespaceIndex
     * @param identifier
     * @throws Exception
     */
    public void subscribe(OpcUaClient client, int namespaceIndex, String identifier) throws Exception {
        //创建发布间隔1000ms的订阅对象
        client
                .getSubscriptionManager()
                .createSubscription(1000.0)
                .thenAccept(t -> {
                    //节点
                    NodeId nodeId = new NodeId(namespaceIndex, identifier);
                    ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, null);
                    //创建监控的参数
                    MonitoringParameters parameters = new MonitoringParameters(UInteger.valueOf(atomic.getAndIncrement()), 1000.0, null, UInteger.valueOf(10), true);
                    //创建监控项请求
                    //该请求最后用于创建订阅。
                    MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                    List<MonitoredItemCreateRequest> requests = new ArrayList<>();
                    requests.add(request);
                    //创建监控项，并且注册变量值改变时候的回调函数。
                    t.createMonitoredItems(
                            TimestampsToReturn.Both,
                            requests,
                            (item, id) -> item.setValueConsumer((it, val) -> {
                                System.out.println("nodeid :" + it.getReadValueId().getNodeId());
                                System.out.println("value :" + val.getValue().getValue());
                            })
                    );
                }).get();

        //持续订阅
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 批量订阅
     *
     * @param client
     * @throws Exception
     */
    public void subscribeBatch(OpcUaClient client, int batchNamespaceIndex, String[] batchIdentifiers) throws Exception {
        final CountDownLatch eventLatch = new CountDownLatch(1);
        //处理订阅业务
        handlerMultipleNode(client, batchNamespaceIndex, batchIdentifiers);
        //持续监听
        eventLatch.await();
    }

    /**
     * 处理订阅业务
     *  @param client OPC UA客户端
     * @param batchNamespaceIndex
     * @param batchIdentifiers
     */
    private void handlerMultipleNode(OpcUaClient client, int batchNamespaceIndex, String[] batchIdentifiers) {
        try {
            //创建订阅
            ManagedSubscription subscription = ManagedSubscription.create(client);
            List<NodeId> nodeIdList = new ArrayList<>();
            for (String id : batchIdentifiers) {
                nodeIdList.add(new NodeId(batchNamespaceIndex, id));
            }
            //监听
            List<ManagedDataItem> dataItemList = subscription.createDataItems(nodeIdList);
            for (ManagedDataItem managedDataItem : dataItemList) {
                managedDataItem.addDataValueListener((t) -> {
                    System.out.println(managedDataItem.getNodeId().getIdentifier().toString() + ":" + t.getValue().getValue().toString());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
